由於此次的 Data Pipeline 有基於時間(execute_date)為條件批次執行的特性,在來源資料不確定的情況下,因此需要考慮有補檔(rerun)的可能性。所以我們必須在 Jenkins pipeline 中將時間作為一個可被注入的參數。
以下有兩種方式可以做到將時間放入 pipeline 中:
Date Parameter
也需要安裝 Date Parameter Plugin
有位大大在 StackOverflow 給出了第一種方法的優化
properties([parameters([
[$class: 'DateParameterDefinition',
name: 'EXECUTE_DATE',
dateFormat: 'yyyy-MM-dd',
defaultValue: 'LocalDate.now()']
])])
pipeline {
...
}
在每日爬蟲資料被寫進 GCS 後,在確認資料的正確性後我們即可將單日(execute_date)的批次資料寫入 History Table
// update_content_info_hist.sql
DELETE FROM `ithome-jenkins-2022.ithome.content_info_hist_test`
WHERE DATE(crawl_datetime) = @execute_date;
INSERT INTO `ithome-jenkins-2022.ithome.content_info_hist`
SELECT
`_id`,
`crawl_datetime`,
`text`,
`user_id`,
`ironman_id`,
`title`,
`like`,
`comment`,
`view`,
`article_id`,
`article_url`,
`create_datetime`
FROM `ithome-jenkins-2022.ithome.content_info_tmp`
WHERE DATE(crawl_datetime) = @execute_date
;
理論上應該要用 drop partition,但是 BigQuery 好像沒有直接在 SQL 下
DROP PARTITION
的寫法,所以先用了一個很爛的寫法,去 full scan table 然後刪除資料 QQ 有機會再好好研究 BigQuery
BigQuery command
cat update_content_info_hist.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${EXECUTE_DATE}"
Jenkinsfile
stage("Append to history table"){
steps{
sh '''
cat sql/update_content_info_hist.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${params.EXECUTE_DATE}"
'''
}
}
此確認邏輯亦可寫成一個 Jenkins 的 stage。(建議)
// overwrite_content_info_latest.sql
TRUNCATE TABLE `ithome-jenkins-2022.ithome.content_info_latest`;
INSERT INTO `ithome-jenkins-2022.ithome.content_info_latest`
SELECT
`_id`,
`crawl_datetime`,
`text`,
`user_id`,
`ironman_id`,
`title`,
`like`,
`comment`,
`view`,
`article_id`,
`article_url`,
`create_datetime`
FROM `ithome-jenkins-2022.ithome.content_info_hist`
WHERE DATE(crawl_datetime) = @execute_date
;
ithome.content_info_latest
全表更新為 @execute_date
的資料
BigQuery command
cat overwrite_content_info_latest.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${EXECUTE_DATE}"
Jenkinsfile
stage("Update to latest table"){
steps{
sh '''
cat sql/overwrite_content_info_latest.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${params.EXECUTE_DATE}"
'''
}
}
// overwrite_content_info_view_change.sql
TRUNCATE TABLE `ithome-jenkins-2022.ithome.content_info_view_change`;
INSERT INTO `ithome-jenkins-2022.ithome.content_info_view_change` (
`ironman_id`,
`article_id`,
`view`,
`crawl_datetime`,
`latest_datetime`
)
SELECT
`ironman_id`,
`article_id`,
`view`,
`crawl_datetime`,
CASE WHEN
DATE(crawl_datetime) = @execute_date THEN true ELSE false
END AS `latest_datetime`
FROM
`ithome-jenkins-2022.ithome.content_info_hist`
WHERE DATE(crawl_datetime) <= @execute_date
;
BigQuery command
cat overwrite_content_info_view_change.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${EXECUTE_DATE}"
Jenkinsfile
stage("Update to latest table"){
steps{
sh '''
cat sql/overwrite_content_info_view_change.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${params.EXECUTE_DATE}"
'''
}
}
完整 Jenkinsfile
properties([parameters([
[$class: 'DateParameterDefinition',
name: 'EXECUTE_DATE',
dateFormat: 'yyyy-MM-dd',
defaultValue: 'LocalDate.now().plusHours(8)']
])])
pipeline {
agent{
label "gcp-agent-1"
}
environment {
MONGO_HOST = "mongodb://localhost:27017"
MONGO_DB = "ithome_ironman"
}
stages {
stage('Data pipeline(stage 1)') {
matrix {
axes {
axis {
name 'DATA'
values 'user_info', 'content_info'
}
}
stages {
stage("Pull mongo data"){
steps{
sh """
python3 mongo_client.py -c ${DATA} \
to-csv --csv-file-path output/${DATA}/${DATA}.csv
"""
}
}
stage("Check mongo data"){
steps{
sh '''
MONGO_DATA_COUNT=$(python3 mongo_client.py -c ${DATA} count-data --contain-header)
CSV_DATA_COUNT=$(cat output/${DATA}/${DATA}.csv|wc -l)
echo "Mongo data count: ${MONGO_DATA_COUNT}"
echo "CSV data count: ${CSV_DATA_COUNT}"
if [ $CSV_DATA_COUNT != $CSV_DATA_COUNT ]; then exit 1; fi
'''
}
}
stage("Check data quality"){
steps{
sh """
docker run -v ${WORKSPACE}/output/${DATA}/:/usr/src/github/ piperider run
"""
sh '''
sudo rm -rf ${WORKSPACE}/output/${DATA}/.piperider/outputs/latest
sudo ln -s ${WORKSPACE}/output/${DATA}/.piperider/outputs/$(ls ${WORKSPACE}/output/${DATA}/.piperider/outputs | grep ithome|tail -n1) ${WORKSPACE}/output/${DATA}/.piperider/outputs/latest
'''
sh "python3 get_piperider_result.py --data-source-name ${DATA} "
}
}
stage("Push to GCS"){
steps{
sh """
gcloud alpha storage cp output/${DATA}/${DATA}.csv gs://crawler_result/ithome/ironman2022
"""
}
}
stage("Append to history table"){
steps{
sh """
cat sql/update_${DATA}_hist.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${params.EXECUTE_DATE}"
"""
}
}
}
}
}
stage("Data pipeline(stage 2)") {
matrix {
axes {
axis {
name 'OUTPUT_TABLE'
values 'user_info_latest', 'content_info_latest', 'content_info_view_change'
}
}
stages {
stage("Update GDS table"){
steps{
sh """
cat sql/overwrite_${OUTPUT_TABLE}.sql | bq query \
--nouse_legacy_sql \
--parameter execute_date:DATE:"${params.EXECUTE_DATE}"
"""
}
}
}
}
}
stage("House keeping"){
steps{
sh "python3 mongo_client.py -c user_info housekeeping"
sh "python3 mongo_client.py -c content_info housekeeping"
}
}
}
post{
always{
archiveArtifacts artifacts: 'output/**', followSymlinks: false
}
}
}
Jenkinsfile 有以下幾點需要說明:
mongodb://localhost:27017
bq
或是 gcloud
的指令。sudo
權限的 (在某些情況下,這樣設定其實不太好)。stage - Check data quality
後來是用 Docker 去運行 piperider 的原因是 Jenkins Agent 預設只有 Python3.6 ,可是 PipeRider 最低需要 Python3.7 方能安裝,故最終使用官方的 Docker image 為基礎,去執行資料品質分析,但使用 Docker 同時衍生了一些路徑問題,可以在同一個 stage 的後半段看到,我重新做軟連接。花了幾天的時間帶著大家一步一步完成 CI/CD Pipeline 與 Data Pipeline,明天會介紹最終的 Google Data Studio 的分析報表。
https://towardsdatascience.com/15-essential-steps-to-build-reliable-data-pipelines-58847cb5d92f
https://learn.microsoft.com/zh-tw/power-bi/guidance/star-schema
https://stackoverflow.com/questions/53712774/jenkins-date-parameter-plugin-how-to-use-it-in-a-declarative-pipeline